Full Pipeline construction on the Cloud to determinate the best location for e-scooters based on weather and crowds predictions

Gans (e-scooters renting company) has seen that its operational success depends on having a good prediction of their e-scooters parked where users need them. This project task will be to collect data from external sources that can potentially help Gans predict e-scooter movement by creating the all automated system in the cloud.

Introduction

Ideally, scooters get rearranged organically by having certain users moving from point A to point B, and then an even number of users moving from point B to point A. There are some elements that create asymmetries. Here are some of them:

There are some actions that the company can perform to solve these asymmetries, namely:

Objective

Since data is needed every day, in real time and accessible by everyone in the company, the challenge is going to be to assemble and automate a data pipeline in the cloud. to achieve this task was required the following:

1) Python scripts for data collection from different APIs sources 2) Create a relational data base (DB) from scratch using MySQL into AWS Cloud Compute Service (RDS) 3) Automate the data collection to the DB using the AWS Lambda funtions

Initial Data Collection

NOTE: Due to the limitation of the Free of cost API source, for this project was used a sample of 15 cities to collect the geographycal, airports and weather information and just 1 airport to collect the flights data

Pre-requirements:

1) CSV Files

  1. Downloand from GeoNames the world cities CSV, where you can find the unique geo_nameid numbers for each country-city (world_cities.csv)
  2. For this project was created a CSV file with a sample of 15 of the most populated cities from Europe (name,country).

CSV example:

csv
name,country
London,United Kingdom
Berlin,Germany
Madrid,Spain
Paris,France
Bucharest,Romania
Budapest,Hungary
Hamburg,Germany
Warsaw,Poland
Vienna,Austria
Barcelona,Spain
Stockholm,Sweden
Belgrade,Serbia
Munich,Germany
Rome,Italy
Porto,Portugal

2) API data source accounts

  1. GeoNames geographical database API service(For more details, please visit their webpage: https://www.geonames.org/)
  2. OpenWeather Weather database API service (For more details, please visit their webpage: https://home.openweathermap.org/)
  3. RapidAPI Airports and Flights database API service (For more details, please visit their webpage: https://rapidapi.com/)

3) config File

The config.py file will hold all your secret information like: API Keys, API user names and the Data Base access parameters, asigned into diferent variables to be call in the main script

Cities Data:

The following code was used to collect the geographical data and generate the csv file that will be use for filling the cities table in the Data Base.

Description:

  1. Compare our 15 sample cities to the world cities and determinate the geonameid which will become our future primary key for the cities table
  2. Using the 15 geonameid, request via geonames API the all geographical information (json), select required information from it and create a dataframe.
  3. Create a new CSV file with the geo_dataframe required to be use later.

Code:

import geocoder
import pandas as pd

wc = pd.read_csv('world_cities.csv')
countries= pd.read_csv('EuroCitiesPopulation15_.csv').country
cities = pd.read_csv('EuroCitiesPopulation15_.csv').name

city_id=[]
for co,ci in zip(countries,cities):
    city_id=city_id+[(wc.set_index('country').loc[co].set_index('name').loc[ci].geonameid)]


lat=[]
lng=[]
population=[]
country_code=[]
time_zone=[]
east=[]
south=[]
north=[]
west=[]

for c in city_id:

    # DATA COLLECTION:

    g = geocoder.geonames(c, method='details', key='jlma_ve84')
    lat             = lat+[(g.geojson['features'][0]['properties']['lat'])]
    lng             = lng+[(g.geojson['features'][0]['properties']['lng'])]
    population      = population+[(g.geojson['features'][0]['properties']['population'])]
    country_code    = country_code+[(g.geojson['features'][0]['properties']['country_code'])]
    time_zone       = time_zone+[3600*(g.geojson['features'][0]['properties']['raw']['timezone']['gmtOffset'])]
    east            = east+[(g.geojson['features'][0]['properties']['raw']['bbox']['east'])]
    south           = south+[(g.geojson['features'][0]['properties']['raw']['bbox']['south'])]
    north           = north+[(g.geojson['features'][0]['properties']['raw']['bbox']['north'])]
    west            = west+[(g.geojson['features'][0]['properties']['raw']['bbox']['west'])]  

cities_dic = {'city_id': city_id,
              'city': cities, 
              'country':countries, 
              'Code': country_code, 
              'Population': population, 
              'Time_Zone': time_zone, 
              'Latitud': lat, 
              'Logitud': lng, 
              'East': east, 
              'South': south, 
              'North': north, 
              'West': west}
cities_df  = pd.DataFrame.from_dict(cities_dic)
cities_df.to_csv('cities.csv', index=False)

Airports:

The following code was used to collect the airports data and generate the csv file that will be use for filling the airports table in the Data Base

Description:

  1. Using the CSV created for cities before, request via RapidAPI the all airports information (json), select required information from it and create a dataframe.
  2. Create a new CSV file with the airports_dataframe required to be use later for the Data Base.

Code:

import requests
import datetime
import pandas as pd
import numpy as np
import config as cfg

cities_df = pd.read_csv('cities.csv')
cities = cities_df['city']

airports = pd.DataFrame(columns=['city','city_id','lat','lon','icao','iata','name'])


for city in cities:
    lat = float(cities_df.loc[cities_df['city']==city]['Latitud'])
    lon = float(cities_df.loc[cities_df['city']==city]['Logitud'])

    url = f"https://aerodatabox.p.rapidapi.com/airports/search/location/{lat}/{lon}/km/50/16"

    querystring = {"withFlightInfoOnly":"0"}

    headers = {
                "X-RapidAPI-Key": cfg.RAPIDAPI_KEY,
                "X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
            }

    response = requests.request("GET", url, headers=headers)#, params=querystring)
    print('Status code',response.status_code)
    airp_js = response.json()

    for a in airp_js["items"]:
        airports = airports.append({'city':city,
                      'city_id': cities_df[cities_df['city']==city].city_id.iloc[0],
                      'lat':a["location"]["lat"],
                      'lon':a["location"]["lon"],
                      'icao':a["icao"],
                      'iata':a["iata"],
                      'name':a["name"]
                      }
                      ,ignore_index=True)


airports.to_csv('airports_new.csv', index=False)

Flights and Weather:

Collecting Flights ans Weather data was located at the end the pipeline at the AWS Lambda automation data collection because it requires the Cities and Airports information from the Data Base.

Create a database model on the AWS cloud for MySQL instance

Pre-requirements:

  1. Create AWS cloud account
  2. Create a Amazon Relational Database Service (AWS RDS) MySQL instance.
  3. MySQL Workbench propertly configured and connected to the AWS RDS DB

Create the database structure:

Using the following SQL code, create the database and all the tables-schema.

SQL code:

CREATE DATABASE gans;
USE gans;


CREATE TABLE IF NOT EXISTS cities (
    city_id INT,
    city VARCHAR(200),
    country VARCHAR(200),
    country_code CHAR(2),
    population INT,
    time_zone INT,
    latitude FLOAT,
    longitude FLOAT,
    PRIMARY KEY(city_id)
);

SELECT * FROM cities;


CREATE TABLE IF NOT EXISTS weathers (
    weather_id INT auto_increment,
    city_id INT,
    time_utc INT,
    local_time CHAR(20),
    temperature FLOAT,
    humidity FLOAT,
    cloudiness_pc INT,
    wind_speed FLOAT,
    precipitation_prob FLOAT,
    rain_volume FLOAT,
    snow_volume FLOAT,
    PRIMARY KEY(weather_id),
    FOREIGN KEY(city_id) REFERENCES cities(city_id)
);

SELECT * FROM weathers;



CREATE TABLE IF NOT EXISTS airports (
    city_id INT,
    lat FLOAT,
    lon FLOAT,
    icao CHAR(4),
    iata CHAR(3),
    name VARCHAR(200),
    PRIMARY KEY(icao),
    FOREIGN KEY(city_id) REFERENCES cities(city_id)
);

SELECT * FROM airports;




CREATE TABLE IF NOT EXISTS flights (
    flights_id INT auto_increment,
    icao CHAR(4),
    date CHAR(10),
    hour_day CHAR(5),
    num_of_arriv INT,
    num_of_depart INT,
    PRIMARY KEY(flights_id),
    FOREIGN KEY(icao) REFERENCES airports(icao)
);

SELECT * FROM flights;

DB Schema

GansDB_schema.png

The picture shown above graphically represents these four tables and their relations. The primary keys are indicated by the yellow symbols. Foreign keys are indicated by the pink diamonds. Every foreign key is a primary key of another table.

Filling the DB with the Initial Data Collection:

Running the following python script will push the dataframes from csv to cities and airports in the AWS DB.

Description:
  1. Establishes a connection between the RDS-instance.
  2. Inserts the cities.csv into the cities-table and airports.csv into the airports-table inside the database.
Code:

AWS Lambda Data Collection Automation

Pre-requirements:

NOTE: For this project, the collected weather was for 5 days prediction and the flights data is 1 day prediction. It was considered to inplement 2 independed Lambda funtions to separate the data update for weather and flights.

  1. Create and configure a Lambda instance in the your AWS cloud account (TimeOut default= 3 seconds. Make sure to increase the TimeOut time to allow your code to run for 1 min)
  2. Create and configure 2 Lambda funtions with the respective required layers (Lambda Layers will hold all the required python libraries to be used in the scripts for this project)
  3. Create and configure the Lambda Triggers for each funtion (In this case was used 'EventBridge' to repeat every 72 hours for the weather and 24 hours for flights)

Code into Lambda:

It will require the following python scripts:

1) Weather Data Collection:

This code will collect and create a dataframe for the weather prediction in intervals of 3 hour per day for the following 5 days and push the data to the database weather table

1. config.py
DATABASE_HOST       = 'xxxxxxxxxxxxxxxxxxxxxx'
DATABASE_USER       = 'xxxx'
DATABASE_PASSWORD   = 'xxxxxxxxx'
DATABASE_PORT       = 'xxxx'

GEO_USERNAME        = 'xxxxxx'
WEATHER_API_KEY     = 'xxxxxx'
RAPIDAPI_KEY        = 'xxxxxx'
2. lambda_funtion.py
import json
import requests
from datetime import datetime
import pandas as pd
import sqlalchemy
import weather

def lambda_handler(event, context):

    weather.get_weather()

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }
3. weather.py
import requests
from datetime import datetime
import pandas as pd
import config as cfg

def get_weather():
    schema="gans"
    host=cfg.DATABASE_HOST
    user=cfg.DATABASE_USER
    password=cfg.DATABASE_PASSWORD
    port=cfg.DATABASE_PORT
    con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

    cities_df = pd.read_sql('cities',con=con)

    cities = cities_df['city']
    API_key = cfg.WEATHER_API_KEY
    weather = pd.DataFrame(columns=['city_id',
                                    'time_utc',
                                    'local_time',
                                    'temperature',
                                    'humidity',
                                    'cloudiness_pc',
                                    'wind_speed',
                                    'precipitation_prob',
                                    'rain_volume',
                                    'snow_volume'])

    for city in cities:
        url = f"http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={API_key}&units=metric"

        response = requests.get(url)
        js = response.json()

        for x in js["list"]:
            try:
                rain_vol = x["rain"]["3h"]
            except KeyError as e1:
                #print('I got a KeyError - reason "%s"' % str(e1))
                rain_vol = 0

            try:
                snow_vol = x["snow"]["3h"]
            except KeyError as e2:
                #print('I got a KeyError - reason "%s"' % str(e2))
                snow_vol = 0

            weather=weather.append({'city_id': cities_df[cities_df['city']==city].city_id.iloc[0],      
                          'time_utc':x["dt"],
                          'local_time':datetime
                                      .utcfromtimestamp(x["dt"]+
                                                        int(cities_df[cities_df['city']==city].time_zone))
                                      .strftime('%Y-%m-%d %H:%M:%S'),
                          'temperature':x["main"]["temp"],
                          'humidity':x["main"]["humidity"],
                          'cloudiness_pc':x["clouds"]["all"],
                          'wind_speed':x["wind"]["speed"],
                          'precipitation_prob':x["pop"],
                          'rain_volume':rain_vol,#x["rain"]["3h"],
                          'snow_volume':snow_vol#x["snow"]["3h"]
                          }
                         ,ignore_index=True)

    weather.to_sql('weathers', 
               if_exists='append', 
               con=con, 
               index=False)

2) Fligths Data Collection:

This code will collect and create a dataframe for the arrival and departure flights prediction for the next day and categorize the results per each hour of the day. Finally push the dataframe to the database flights table

1. config.py (same file as before)
2. lambda_funtion.py
import json
import requests
from datetime import datetime
import pandas as pd
import sqlalchemy
import weather

def lambda_handler(event, context):

    flights.get_flights()

    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }
3. weather.py
import requests
import datetime
import pandas   as pd
import numpy    as np
import config   as cfg

def get_flights():
    schema="gans"
    host=cfg.DATABASE_HOST
    user=cfg.DATABASE_USER
    password=cfg.DATABASE_PASSWORD
    port=cfg.DATABASE_PORT
    con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'

    #airports= pd.read_sql('airports',con=con)

    flights = pd.DataFrame(columns=['icao','date','hour_day','num_of_arriv','num_of_depart'])

    hour_day= ['00-01','02-03','03-04','04-05','05-06','06-07','07-08','08-09','09-10','10-11','11-12',
            '12-13','13-14','14-15','15-16','16-17','17-18','18-19','19-20','20-21','21-22','22-23','23-00']

    air=['LPPR']

    for a_icao in air:# airpotrs['icao']:

        date = (datetime.date.today() + datetime.timedelta(days=1)).strftime('%Y-%m-%d') 
        t1=['00:00','12:00']
        t2=['11:59','23:59']

        headers = {
            "X-RapidAPI-Host": "aerodatabox.p.rapidapi.com",
            "X-RapidAPI-Key": cfg.RAPIDAPI_KEY
        }
        flight_js = list()
        for i in range(2):
            url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{a_icao}/{date}T{t1[i]}/{date}T{t2[i]}"

            response = requests.request("GET", url, headers=headers)
            print('Status code',response.status_code)
            if response.status_code!=200:
                print("ERROR to communcate with the flights API")
                break
            else:
                temp_flight_js = response.json()
                flight_js.append(temp_flight_js)

        # CALCULATE THE ARRIVAL AND DEPARTURES CATEGORIES OF THE DAY:

        def getTime(list, json, f_js, arrORdep):
            from datetime import datetime
            [date,time_]=(f_js[list][arrORdep][json]['movement']['scheduledTimeLocal']).split()
            [time_,summerdelta]= time_.split('+')
            time_=datetime.strptime(time_,'%H:%M').time()
            return time_

        def getTrafficPerHour(listOfFlights):
            arrivalsPerHour = [0]*24
            for i in range(len(listOfFlights)):
                h_a = listOfFlights[i].hour
                arrivalsPerHour[h_a] +=1
            return arrivalsPerHour

        a_times_series = []; d_times_series = []
        for i_list in range(len(flight_js)):
            for i_json_a,i_json_d in zip((range(len(flight_js[i_list]['arrivals']))),(range(len(flight_js[i_list]['departures'])))):
                a_times_series.append(getTime(i_list, i_json_a,flight_js,'arrivals'))
                d_times_series.append(getTime(i_list, i_json_d,flight_js,'departures'))

        a_l= getTrafficPerHour(a_times_series)        
        d_l= getTrafficPerHour(d_times_series)

        # CREATE THE DATAFRAME OF ARRIVAL & DEPARTURE FLIGHTS PER: AIRPORT,DAY,HOUR:

        for a,d,c in zip(a_l,d_l,hour_day):
            flights = flights.append({'icao':a_icao,
                            'date':date,
                            'hour_day':c,
                            'num_of_arriv':a,
                            'num_of_depart':d
                            }
                            ,ignore_index=True)

        # PUSH THE FLIGHTS DATA TO THE DATABASE:

        flights.to_sql('flights', 
                    if_exists='append', 
                    con=con, 
                    index=False)

Project Summary

The all Data Engineering Pipeline is now running in the Cloud:

Now that the weather & flights prediction is available in the database, updating in real time and accessible by everyone, it will be easy to make a good prediction of the right place and time of the day to locate the e-scooters and provide a valueable insights for Gans company.